/*
 * Dijjer - A Peer to Peer HTTP Cache
 * Copyright (C) 2004,2005 Change.Tv, Inc
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */
package dijjer.io.xfer;

import java.util.Iterator;
import java.util.LinkedList;
import dijjer.Dijjer;
import dijjer.io.comm.AbstractPeer;
import dijjer.io.comm.AbstractUdpSocketManager;
import dijjer.io.comm.DMT;
import dijjer.io.comm.Message;
import dijjer.io.comm.MessageFilter;
import dijjer.io.comm.Peer;
import dijjer.io.comm.UdpSocketManager;
import dijjer.io.comm.WebPeer;
import dijjer.util.BitArray;
import dijjer.util.logging.Logger;

/**
 * @author ian
 * 
 * To change the template for this generated type comment go to Window - Preferences - Java - Code Generation - Code and
 * Comments
 */
public class BlockTransmitter {

	public static final int WAIT_AFTER_ALL_SENT = 10000;
	AbstractUdpSocketManager _usm;
	AbstractPeer _dest;
	int _uid;
	PartiallyReceivedBlock _prb;
	boolean _allReceived = false;
	BitArray _sent = new BitArray(Dijjer.PACKETS_IN_BLOCK);
	Thread _senderThread;
	PacketThrottle _throttle;
	long _lastPacketSendTime;

	public BlockTransmitter(AbstractUdpSocketManager usm, AbstractPeer destination, int uid, PartiallyReceivedBlock source) {
		_usm = usm;
		_dest = destination;
		_uid = uid;
		_prb = source;
		_throttle = PacketThrottle.getThrottle(_dest);
	}

	public int getNumSent() {
		int ttl = 0;
		for (int x = 0; x < _sent.getSize(); x++) {
			if (_sent.bitAt(x)) {
				ttl++;
			}
		}
		return ttl;
	}

	public void send() {
		startReceiverThread();
		_prb.addListener(new PartiallyReceivedBlock.PacketReceivedListener() {

			public void packetReceived(int packetNo) {
				synchronized (BlockTransmitter.this) {
					BlockTransmitter.this.notify();
				}
			}

			public void receiveAborted(int reason, String description) {
				_usm.send(_dest, DMT.createSendAborted(_uid, description));
				synchronized (BlockTransmitter.this) {
					BlockTransmitter.this.notify();
				}
			}
		});
		sendloop: while (!isFinished()) { synchronized(this) {
			// Synchronize on this for the entire loop. This way we don't have to check on
			// isFinished so much since it will only change between loops and during the wait calls.
			// Also, if we have to wait at all we start through the loop again to see if the delay
			// has been updated with a missing packet message.

			// Wait until we are allowed to send a packet according to throttle
			long lTime = System.currentTimeMillis();
			long currentDelay = lTime - _lastPacketSendTime;
			long remainingDelay = _throttle.getDelay() - currentDelay;
			if (remainingDelay > 50) {
				Logger.debug("CCC timeout:"+remainingDelay);
				try {
					this.wait(remainingDelay);
				} catch (InterruptedException e) {
					if (isFinished()) break sendloop;
				}
				continue sendloop;
			}
			lTime = System.currentTimeMillis() - lTime;
			if (lTime > 3000) {
				Logger.warning("Waited " + lTime + "ms due to bandwidth limiting (uid: " + _uid + ")");
			}
			// Check to see if any packets are available to send, or wait until they are or
			// the PRB is aborted
			int nextToSend = -1;
			lTime = System.currentTimeMillis();
			for (int x = 0; x < Dijjer.PACKETS_IN_BLOCK; x++) {
				if (_prb.isReceived(x) && !_sent.bitAt(x)) {
					nextToSend = x;
					break;
				}
			}
			if (nextToSend == -1) {
				try {
					this.wait();
				} catch (InterruptedException e) {
					if (isFinished()) break sendloop;
				}
				continue sendloop;
			}
			lTime = System.currentTimeMillis() - lTime;
			if (lTime > 3000) {
				Logger.warning("Waited " + lTime + "ms due to suitable packet unavailability (uid: " + _uid + ")");
			}
			_lastPacketSendTime = System.currentTimeMillis();
			_throttle.notifyOfPacketSent();
			_usm.send(_dest, DMT.createPacketTransmit(_uid, nextToSend, _sent, _prb.getPacket(nextToSend)));
			_sent.setBit(nextToSend, true);
		}} //end synchronized and sendloop
	}

	protected boolean isFinished() {
		if (_allReceived) {
			return true;
		}
		if (_prb.isAborted()) {
			return true;
		}
		if ((getNumSent() == Dijjer.PACKETS_IN_BLOCK)
				&& (System.currentTimeMillis() - _lastPacketSendTime > WAIT_AFTER_ALL_SENT)) {
			return true;
		}
		return false;
	}

	protected void startReceiverThread() {
		(new Thread() {

			public void run() {
				while (!isFinished()) {
					Message msg = _usm.waitFor((MessageFilter.create(10000, DMT.missingPacketNotification)
							.addType(DMT.allReceived)).setField(DMT.UID, _uid));
					if (msg != null) {
						if (msg.getSpec().equals(DMT.missingPacketNotification)) {
							if (_prb.isAborted()) {
								_usm.send(_dest, DMT.createSendAborted(_prb.getAbortReason(), _prb
										.getAbortDescription()));
							} else {
								LinkedList missing = (LinkedList) msg.getObject(DMT.MISSING);
								if (missing.size() > 50) {
									Logger.warning("Received large missing packet notification from " + _dest
											+ " of size " + missing.size() + " having sent " + getNumSent());
								}
								_throttle.notifyOfPacketLoss(missing.size());
								for (Iterator i = missing.iterator(); i.hasNext();) {
									Integer packetNo = (Integer) i.next();
									if (_prb.isReceived(packetNo.intValue())) {
										_sent.setBit(packetNo.intValue(), false);
									}
								}
							}
						} else if (msg.getSpec().equals(DMT.allReceived)) {
							_allReceived = true;
						}
						synchronized (BlockTransmitter.this) {
							BlockTransmitter.this.notify();
						}
					}
				}
			}
		}).start();
	}
}
